import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
import UniversalTabs from "../../../components/UniversalTabs";

# Streaming in Hatchet

Hatchet offers real-time result streaming of data from a background worker allowing you to subscribe to workflow progress, relay individual step results, and send data from an inprogress step run. This feature enables you to provide real-time updates and progress to users as the workflow runs, enhancing the user experience and interactivity of your application.

## How It Works

When a workflow is triggered, Hatchet generates events for each step of the workflow, including the step's status, output, and relevant metadata. These events are generated by the background workers executing the workflow steps. By subscribing to this event stream, you can capture these events in real-time and, optionally, forward them to the frontend client.

Here's a high-level overview of the real-time progress streaming process:

1. The client triggers a workflow by sending a request to the backend API.
2. The backend API initiates the workflow using the Hatchet SDK and obtains a unique `workflowRunId`.
3. The backend API returns the `workflowRunId` to the client as a reference.
4. The client establishes a connection to a dedicated endpoint on the backend API, passing the `workflowRunId` as a parameter.
5. The backend API subscribes to the Hatchet event stream for the specified `workflowRunId`.
6. As the background workers execute the workflow steps, Hatchet generates events for each step, which are captured by the backend API through the event stream.
7. The backend API processes the events, extracts relevant information, and sends the data to the client in real-time using a streaming response.
8. The client receives the streamed data and updates the user interface accordingly, providing real-time progress updates.

If the connection is lost (e.g., page reload or transient network failure), the client can reconnect to the same endpoint and resume receiving new real-time updates by re-establishing the stream at step 4.

## Listeners

Listeners are used to subscribe to the event stream for a specific workflow run. They are asynchronous generators that yield events as they are received from the event stream. You can filter and transform the event data before sending it to the client. Listeners generate events for Workflow run events, Step run events, and Step stream events.

Here's an example of how to create a listener:

<UniversalTabs items={['Python', 'Typescript']}>
  <Tabs.Tab>

```python
async def listen_for_files():
    workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {"test": "test"})
    listener = hatchet.listener.stream(workflowRunId)

    async for event in listener:
        # Filter and transform event data here
        data = json.dumps({
            "type": event.type,
            "messageId": workflowRunId
        })
        print("data: " + data + "\n\n")

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript copy
async function listen_for_files() {
  const workflowRunId = await hatchet.admin.run_workflow("simple-workflow", {});
  const stream = await hatchet.listener.stream(workflowRunId);
  for await (const event of stream) {
    console.log("event received", event);
  }
}
```

  </Tabs.Tab>

```go copy
workflowRunId, err := c.Admin().RunWorkflow("stream-event-workflow", &streamEventInput{
  Index: 0,
})

if err != nil {
panic(err)
}

err = c.Subscribe().Stream(interruptCtx, workflowRunId, func(event client.StreamEvent) error {
fmt.Println(string(event.Message))

return nil
})

if err != nil {
panic(err)
}

```

</UniversalTabs>

## Streaming from a Step Context

You can also stream events from a specific step context, enabling you to stream arbitrary events, progress, intermediate inference, or debugging information from a step.

<UniversalTabs items={['Python', 'Typescript']}>
  <Tabs.Tab>

```python
@hatchet.step()
def step1(self, context: Context):
    # Stream some data from the step context
    context.put_stream('hello from step1')
    # continue with the step run...
    return {"step1": "results"}
```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
async function step1(ctx: Context) {
  // Stream some data from the step context
  ctx.putStream("step1 stream");
  // continue with the step run...
  return { step1: "step1 results!" };
}
```

  </Tabs.Tab>
</UniversalTabs>

### Streaming Files

Hatchet supports streaming base64 encoded files as part of the event payload, allowing you to transfer small to medium-sized files (under 4 MB) between the backend and frontend without waiting for a step result. For large files, consider using a file storage service and streaming the file URLs instead.

To stream a file from a step context, encode the file data as base64 and stream it as a payload:

<UniversalTabs items={['Python', 'Typescript']}>
  <Tabs.Tab>

```python
@hatchet.step()
def step1(self, context: Context):
    # Get the directory of the current script
    script_dir = os.path.dirname(os.path.abspath(__file__))

    # Construct the path to the image file relative to the script's directory
    image_path = os.path.join(script_dir, "image.jpeg")

    # Load the image file
    with open(image_path, "rb") as image_file:
        image_data = image_file.read()

    # Encode the image data as base64
    base64_image = base64.b64encode(image_data).decode('utf-8')

    # Stream the base64-encoded image data
    context.put_stream(base64_image)

    # continue with the step run...
    return {"step1": "results"}

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
import * as fs from "fs";
import * as path from "path";

async function step1(ctx: Context) {
  // Get the directory of the current script
  const scriptDir = path.dirname(__filename);

  // Construct the path to the image file relative to the script's directory
  const imagePath = path.join(scriptDir, "image.jpeg");

  // Load the image file
  const imageData = await fs.promises.readFile(imagePath);

  // Encode the image data as base64
  const base64Image = Buffer.from(imageData).toString("base64");

  // Stream the base64-encoded image data
  ctx.putStream(base64Image);

  // continue with the step run...
  return { step1: "results" };
}
```

  </Tabs.Tab>
</UniversalTabs>

For the listener, decode the base64-encoded payload and write it to a file:

<UniversalTabs items={['Python', 'Typescript']}>
  <Tabs.Tab>

```python
async def listen_for_files():
    load_dotenv()
    hatchet = new_client()
    workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {"test": "test"})
    listener = hatchet.listener.stream(workflowRunId)

    # Get the directory of the current script
    script_dir = os.path.dirname(os.path.abspath(__file__))

    # Create the "out" directory if it doesn't exist
    out_dir = os.path.join(script_dir, "out")
    os.makedirs(out_dir, exist_ok=True)

    async for event in listener:
        if event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:
            # Decode the base64-encoded payload
            decoded_payload = base64.b64decode(event.payload)

            # Construct the path to the payload file in the "out" directory
            payload_path = os.path.join(out_dir, "payload.jpg")

            with open(payload_path, "wb") as f:
                f.write(decoded_payload)
```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
import * as fs from 'fs';
import * as path from 'path';
import { config } from 'dotenv';
import { Hatchet, StepRunEventType } from 'hatchet';

async function listenForFiles() {
config();
const hatchet = new Hatchet();
const workflowRunId = await hatchet.admin.runWorkflow('ManualTriggerWorkflow', { test: 'test' });
const listener = await hatchet.listener.stream(workflowRunId);

// Get the directory of the current script
const scriptDir = path.dirname(\_\_filename);

// Create the "out" directory if it doesn't exist
const outDir = path.join(scriptDir, 'out');
await fs.promises.mkdir(outDir, { recursive: true });

for await (const event of listener) {
    if (event.type === StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM) {
      // Decode the base64-encoded payload
      const decodedPayload = Buffer.from(event.payload, 'base64');
      // Construct the path to the payload file in the "out" directory
      const payloadPath = path.join(outDir, 'payload.jpg');

      // Write the decoded payload to the file
      await fs.promises.writeFile(payloadPath, decodedPayload);
    }
  }
}
```

</Tabs.Tab>
</UniversalTabs>

## Streaming by Additional Metadata

Often it is helpful to stream from multiple workflows (i.e. child workflows spawned from a parent) to achieve this, you can specify an [additional meta](/features/additional-metadata) key-value pair before running a workflow that can then be used to subscribe to all events from workflows that have the same key-value pair.

Since additinoal metadata is propagated from parent to child workflows, this can be used to track all events from a specific workflow run.

Here's an example of how to create a listener:

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
# Generate a random stream key to use to track all
# stream events for this workflow run.
streamKey = "streamKey"
streamVal = f"sk-{random.randint(1, 100)}"

# Specify the stream key as additional metadata
# when running the workflow.

# This key gets propagated to all child workflows
# and can have an arbitrary property name.

workflowRun = hatchet.admin.run_workflow(
    "Parent",
    {"n": 2},
    options={"additional_metadata": {streamKey: streamVal}},
)

# Stream all events for the additional meta key value
listener = hatchet.listener.stream_by_additional_metadata(streamKey, streamVal)

async for event in listener:
    print(event.type, event.payload)

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript copy
// Generate a random stream key to use to track all
// stream events for this workflow run.
const streamKey = "streamKey";
const streamVal = `sk-${Math.random().toString(36).substring(7)}`;

// Specify the stream key as additional metadata
// when running the workflow.

// This key gets propagated to all child workflows
// and can have an arbitrary property name.
await hatchet.admin.runWorkflow(
  "parent-workflow",
  {},
  { additionalMetadata: { [streamKey]: streamVal } },
);

// Stream all events for the additional meta key value
const stream = await hatchet.listener.streamByAdditionalMeta(
  streamKey,
  streamVal,
);

for await (const event of stream) {
  console.log("event received", event);
}
```

  </Tabs.Tab>
  <Tabs.Tab>

```go copy
// Generate a random stream key to use to track all
// stream events for this workflow run.
streamKey := "streamKey"
streamValue := fmt.Sprintf("stream-event-%d", rand.Intn(100)+1)

// Specify the stream key as additional metadata
// when running the workflow.

// This key gets propagated to all child workflows
// and can have an arbitrary property name.
\_, err = c.Admin().RunWorkflow("stream-event-workflow", &streamEventInput{
    Index: 0,
  },
    client.WithRunMetadata(map[string]interface{}{
    streamKey: streamValue,
  }),
)

if err != nil {
  panic(err)
}

// Stream all events for the additional meta key value
err = c.Subscribe().StreamByAdditionalMetadata(interruptCtx, streamKey, streamValue, func(event client.StreamEvent) error {
  fmt.Println(string(event.Message))
  return nil
})
```

</Tabs.Tab>
</UniversalTabs>

## Consuming Streams on Frontend

To consume a stream from the backend, create a Streaming Response endpoint to "proxy" the stream from the Hatchet workflow run.

First, write a generator to filter and transform the event data before sending it to the client:

```python
def event_stream_generator(workflowRunId):
    ''' This helper function is a generator that yields events from the Hatchet event stream. '''
    stream = hatchet.client.listener.stream(workflowRunId)
    for event in stream:
        ''' you can filter and transform event data here that will be sent to the client'''
        if event.type == "step_completed":
            data = json.dumps({
                "type": event.type,
                "payload": event.payload,
                "messageId": workflowRunId
            })
            yield "data: " + data + "\n\n"
```

Next, create a streaming `GET` endpoint that the client connects to in order to receive real-time progress updates:

```python
@app.get("/message/{messageId}")
async def stream(messageId: str):
    ''' in a normal application you might use the message id to look up a workflowRunId
    for this simple case, we have no persistence and just use the message id as the workflowRunId
    you might also consider looking up the workflowRunId in a database and returning the results
    if the message has already been processed '''
    workflowRunId = messageId
    return StreamingResponse(event_stream_generator(workflowRunId), media_type='text/event-stream')
```

## Related Tutorials

- [Real-time Result Streaming with Hatchet and FastAPI](../tutorials/fastapi-react/result-streaming.mdx)
